liburingutils: Add API to handle socket message asynchronous Bug: 385143770 Test: Unit test Change-Id: I32ed7511e4e94e43766acb9ef3ddffc5d2eb0320 Signed-off-by: Akilesh Kailash <akailash@google.com>
diff --git a/Android.bp b/Android.bp index 5fa331e..f625107 100644 --- a/Android.bp +++ b/Android.bp
@@ -6,15 +6,19 @@ name: "liburingutils", srcs: [ - "src/LibUringUtils.cpp", + "src/IOUringSocketHandler.cpp", ], cflags: ["-Werror"], export_include_dirs: ["include"], + static_libs: [ + "liburing", + ], shared_libs: [ "libbase", + "liblog", ], tidy: true, @@ -32,10 +36,13 @@ } cc_test { - name: "liburingutils_tests", + name: "IOUringSocketHandler_tests", test_suites: ["device-tests"], srcs: [ - "src/LibUringUtils_test.cpp", + "src/IOUringSocketHandler_test.cpp", + ], + static_libs: [ + "liburing", ], shared_libs: [ "liburingutils", diff --git a/include/IOUringSocketHandler/IOUringSocketHandler.h b/include/IOUringSocketHandler/IOUringSocketHandler.h new file mode 100644 index 0000000..226a5b2 --- /dev/null +++ b/include/IOUringSocketHandler/IOUringSocketHandler.h
@@ -0,0 +1,174 @@ +/* + * Copyright (C) 2025 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <memory> +#include <vector> + +#include <liburing.h> + +/* + * IOUringSocketHandler is a helper class for using io_uring with a socket. + * + * Typical usage from a given thread: + * + * As a one time setup: + * 1. Create an instance of IOUringSocketHandler with the socket file descriptor. + * 2. Setup io_uring ring buffer. + * 3. Allocate buffers for the ring buffer. + * 4. Register buffers with io_uring. + * 5. EnqueueMultishotRecvmsg() will submit the SQE to receive the data + * + * In the I/O path: + * + * 6. Receive data from the socket through ReceiveData() + * 7. Release the buffer to io_uring. + * + * Note that the thread which sets up the io_uring instance should handle the + * I/O through ReceiveData() call. + */ + +class IOUringSocketHandler { +public: + IOUringSocketHandler(int socket_fd); + ~IOUringSocketHandler(); + + // Setup io_uring ring buffer + // queue_size: The size of the io_uring submission queue. + // Determines the maximum number of outstanding I/O requests. + // return: true on success, false on failure (e.g., if io_uring_setup fails). + // + // This function initializes the io_uring context and sets up the submission + // and completion queues. It prepares the io_uring instance for I/O operations. + // A larger queue_size allows for more concurrent I/O operations but consumes + // more memory. + bool SetupIoUring(int queue_size); + + // Allocate 'num_buffers' of size 'buf_size' + // + // num_buffers: The number of buffers to allocate. + // buf_size: The size of each buffer in bytes. + // + // This function allocates a set of buffers that will be used for I/O operations + // with io_uring. These buffers are typically used to hold data that is read from + // or written to files or sockets. The allocated buffers are managed internally + // and are later registered with io_uring. + // + // The num_buffers will be the payload for the caller. Internally, it + // allocates additional metadata: + // a: sizeof(struct ucred) + sizeof(struct cmsghdr) + // b: sizeof(struct io_uring_recvmsg_out) + // This allows sender to send the ucred credential information if required. + // + // This function also registers the allocated buffers with the io_uring instance. + // Registering buffers allows the kernel to access them directly, avoiding the need + // to copy data between user space and kernel space during I/O operations. This + // improves performance. + // + // Please see additional details on how num_buffers will be used + // by the io_uring: https://man7.org/linux/man-pages/man3/io_uring_setup_buf_ring.3.html + bool AllocateAndRegisterBuffers(size_t num_buffers, size_t buf_size); + + // Free up registered buffers with the io_uring instance. + // + // All the buffers allocated using AllocateAndRegisterBuffers() API will be + // freed and de-registered. Callers can then call + // AllocateAndRegisterBuffers() to re-register new set of bufferes with the + // ring. + void DeRegisterBuffers(); + + // ARM io_uring recvmsg opcode + // + // return: true on success, false on failure (e.g., if submission queue is full). + // + // This function enqueues a "multishot recvmsg" operation into the io_uring submission queue. + // Multishot recvmsg allows receiving multiple messages from a socket with a single + // io_uring submission. The function prepares the submission queue + // entry (SQE) for the recvmsg operation. + bool EnqueueMultishotRecvmsg(); + + // Release the buffer to io_uring + // + // This function releases a buffer back to the io_uring subsystem after it has been + // used for an I/O operation. This makes the buffer available for reuse in subsequent + // I/O operations. + // + // Additionally, when the buffer is released, a check is done to see if + // there are more CQE entries available. If not, EnqueueMultishotRecvmsg() + // is invoked so that the SQE submission is done for receiving next set of + // I/O. + void ReleaseBuffer(); + + // Receive payload data of size payload_len. Additionally, receive + // credential data. + // + // payload: A pointer to a void pointer. This will be set to point to the received + // payload data. + // + // payload_len: A reference to a size_t. This will be set to the length of the + // received payload data. + // + // cred: A pointer to a struct ucred pointer. This will be set to point to the + // user credentials associated with the received data (if available). + // If the sender doesn't have credential information in the payload, + // then nullptr will be returned. + // + // This function retrieves the data received from a recvmsg operation. It extracts the payload + // data and its length, as well as the user credentials associated with the sender. The + // caller is responsible for freeing the allocated memory for the payload and credentials + // when they are no longer needed. + void ReceiveData(void** payload, size_t& payload_len, struct ucred** cred); + + // check if io_uring is supported + // + // return: true if io_uring is supported by the kernel, false otherwise. + // + // This function checks if the io_uring feature is supported by the underlying Linux kernel. + static bool isIouringEnabled(); + +private: + static bool isIouringSupportedByKernel(); + // Register buffers with io_uring + // + // return: true on success, false on failure (e.g., if io_uring_register_buffers fails). + // + // This function registers the previously allocated buffers with the io_uring instance. + // Registering buffers allows the kernel to access them directly, avoiding the need + // to copy data between user space and kernel space during I/O operations. This + // improves performance. + bool RegisterBuffers(); + + struct uring_context { + struct io_uring ring; + }; + // Socket fd + int socket_; + std::unique_ptr<uring_context> mCtx; + std::vector<std::unique_ptr<uint8_t[]>> buffers_; + struct msghdr msg; + int control_len_; + size_t num_buffers_ = 0; + int buffer_size_; + int active_buffer_id_ = -1; + struct io_uring_cqe* cqe; + // A constant buffer group id as we don't support multiple buffer groups + // yet. + const int bgid_ = 7; + struct io_uring_buf_ring* br_; + bool registered_buffers_ = false; + bool ring_setup_ = false; +}; diff --git a/include/liburingutils/LibUringUtils.h b/include/liburingutils/LibUringUtils.h deleted file mode 100644 index a6dbefc..0000000 --- a/include/liburingutils/LibUringUtils.h +++ /dev/null
@@ -1,28 +0,0 @@ -/* - * Copyright (C) 2025 The Android Open Source Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __LIBURING_UTILS_H -#define __LIBURING_UTILS_H - -class LibUringUtils { -public: - static bool isIouringEnabled(); - -private: - static bool isIouringSupportedByKernel(); -}; - -#endif diff --git a/src/IOUringSocketHandler.cpp b/src/IOUringSocketHandler.cpp new file mode 100644 index 0000000..0c68281 --- /dev/null +++ b/src/IOUringSocketHandler.cpp
@@ -0,0 +1,206 @@ +/* + * Copyright (C) 2025 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define LOG_TAG "IOUringSocketHandler" + +#include <sys/resource.h> +#include <sys/utsname.h> +#include <unistd.h> + +#include <limits.h> +#include <linux/time_types.h> +#include <sys/cdefs.h> +#include <sys/prctl.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/un.h> +#include <unistd.h> + +#include <chrono> +#include <thread> + +#include <cutils/sockets.h> +#include <private/android_logger.h> + +#include <IOUringSocketHandler/IOUringSocketHandler.h> + +#include <android-base/logging.h> +#include <android-base/scopeguard.h> + +bool IOUringSocketHandler::isIouringEnabled() { + return isIouringSupportedByKernel(); +} + +bool IOUringSocketHandler::isIouringSupportedByKernel() { + struct utsname uts {}; + unsigned int major, minor; + + uname(&uts); + if (sscanf(uts.release, "%u.%u", &major, &minor) != 2) { + return false; + } + + // We will only support kernels from 6.1 and higher. + return major > 6 || (major == 6 && minor >= 1); +} + +IOUringSocketHandler::IOUringSocketHandler(int socket_fd) : socket_(socket_fd) {} + +IOUringSocketHandler::~IOUringSocketHandler() { + DeRegisterBuffers(); + if (ring_setup_) { + io_uring_queue_exit(&mCtx->ring); + } +} + +bool IOUringSocketHandler::EnqueueMultishotRecvmsg() { + struct io_uring_sqe* sqe = io_uring_get_sqe(&mCtx->ring); + memset(&msg, 0, sizeof(msg)); + msg.msg_controllen = control_len_; + io_uring_prep_recvmsg_multishot(sqe, socket_, &msg, 0); + sqe->flags |= IOSQE_BUFFER_SELECT; + sqe->buf_group = bgid_; + int ret = io_uring_submit(&mCtx->ring); + if (ret < 0) { + LOG(ERROR) << "EnqueueMultishotRecvmsg failed: ret: " << ret; + return false; + } + return true; +} + +bool IOUringSocketHandler::AllocateAndRegisterBuffers(size_t num_buffers, size_t buf_size) { + num_buffers_ = num_buffers; + control_len_ = CMSG_ALIGN(sizeof(struct ucred)) + sizeof(struct cmsghdr); + + buffer_size_ = sizeof(struct io_uring_recvmsg_out) + control_len_ + buf_size; + + for (size_t i = 0; i < num_buffers_; i++) { + std::unique_ptr<uint8_t[]> buffer = std::make_unique<uint8_t[]>(buffer_size_); + buffers_.push_back(std::move(buffer)); + } + return RegisterBuffers(); +} + +bool IOUringSocketHandler::RegisterBuffers() { + int ret = 0; + br_ = io_uring_setup_buf_ring(&mCtx->ring, num_buffers_, bgid_, 0, &ret); + if (!br_) { + LOG(ERROR) << "io_uring_setup_buf_ring failed with error: " << ret; + return false; + } + for (size_t i = 0; i < num_buffers_; i++) { + void* buffer = buffers_[i].get(); + io_uring_buf_ring_add(br_, buffer, buffer_size_, i, io_uring_buf_ring_mask(num_buffers_), + i); + } + io_uring_buf_ring_advance(br_, num_buffers_); + LOG(DEBUG) << "RegisterBuffers success: " << num_buffers_; + registered_buffers_ = true; + return true; +} + +void IOUringSocketHandler::DeRegisterBuffers() { + if (registered_buffers_) { + io_uring_free_buf_ring(&mCtx->ring, br_, num_buffers_, bgid_); + } + buffers_.clear(); + num_buffers_ = 0; + control_len_ = 0; + buffer_size_ = 0; +} + +bool IOUringSocketHandler::SetupIoUring(int queue_size) { + mCtx = std::unique_ptr<uring_context>(new uring_context()); + struct io_uring_params params = {}; + + // COOP_TASKRUN - No IPI to logd + // SINGLE_ISSUER - Only one thread is doing the work on the ring + // TASKRUN_FLAG - we use peek_cqe - Hence, trigger task work if required + // DEFER_TASKRUN - trigger task work when CQE is explicitly polled + params.flags |= (IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SINGLE_ISSUER | + IORING_SETUP_TASKRUN_FLAG | IORING_SETUP_DEFER_TASKRUN); + + int ret = io_uring_queue_init_params(queue_size + 1, &mCtx->ring, ¶ms); + if (ret) { + LOG(ERROR) << "io_uring_queue_init_params failed with ret: " << ret; + return false; + } else { + LOG(INFO) << "io_uring_queue_init_params success"; + } + + ring_setup_ = true; + return true; +} + +void IOUringSocketHandler::ReleaseBuffer() { + if (active_buffer_id_ == -1) { + return; + } + + // Put the buffer back to the pool + io_uring_buf_ring_add(br_, buffers_[active_buffer_id_].get(), buffer_size_, active_buffer_id_, + io_uring_buf_ring_mask(num_buffers_), 0); + io_uring_buf_ring_cq_advance(&mCtx->ring, br_, 1); + active_buffer_id_ = -1; + + // If there are no more CQE data, re-arm the SQE + bool is_more_cqe = (cqe->flags & IORING_CQE_F_MORE); + if (!is_more_cqe) { + EnqueueMultishotRecvmsg(); + } +} + +void IOUringSocketHandler::ReceiveData(void** payload, size_t& payload_len, struct ucred** cred) { + if (io_uring_peek_cqe(&mCtx->ring, &cqe) < 0) { + int ret = io_uring_wait_cqe(&mCtx->ring, &cqe); + if (ret) { + LOG(ERROR) << "WaitCqe failed: " << ret; + EnqueueMultishotRecvmsg(); + return; + } + } + + if (cqe->res < 0) { + io_uring_cqe_seen(&mCtx->ring, cqe); + EnqueueMultishotRecvmsg(); + return; + } + + active_buffer_id_ = cqe->flags >> IORING_CQE_BUFFER_SHIFT; + + void* this_recv = buffers_[active_buffer_id_].get(); + struct io_uring_recvmsg_out* o = io_uring_recvmsg_validate(this_recv, cqe->res, &msg); + + if (!o) { + return; + } + + struct cmsghdr* cmsg; + cmsg = io_uring_recvmsg_cmsg_firsthdr(o, &msg); + + struct ucred* cr = nullptr; + while (cmsg != nullptr) { + if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_CREDENTIALS) { + cr = (struct ucred*)CMSG_DATA(cmsg); + break; + } + cmsg = io_uring_recvmsg_cmsg_nexthdr(o, &msg, cmsg); + } + + *payload = io_uring_recvmsg_payload(o, &msg); + payload_len = io_uring_recvmsg_payload_length(o, cqe->res, &msg); + *cred = cr; +} diff --git a/src/LibUringUtils_test.cpp b/src/IOUringSocketHandler_test.cpp similarity index 62% rename from src/LibUringUtils_test.cpp rename to src/IOUringSocketHandler_test.cpp index aff5206..67361f6 100644 --- a/src/LibUringUtils_test.cpp +++ b/src/IOUringSocketHandler_test.cpp
@@ -14,20 +14,18 @@ * limitations under the License. */ -#include <liburingutils/LibUringUtils.h> +#include <IOUringSocketHandler/IOUringSocketHandler.h> #include <gtest/gtest.h> -class LibUringUtilsTest : public testing::Test { +class IOUringSocketHandlerTest : public testing::Test { public: void testIsIouringEnabled(bool expectedResult) { - EXPECT_EQ(LibUringUtils::isIouringEnabled(), expectedResult); + EXPECT_EQ(IOUringSocketHandler::isIouringEnabled(), expectedResult); } }; -TEST_F(LibUringUtilsTest, ReturnsIouringNotEnabled) { - // TODO: b/385143770 - Change this test to base on the real OS version, - // this default expected value is true for the binary built from the - // latest version of source code. - testIsIouringEnabled(true); +TEST_F(IOUringSocketHandlerTest, ReturnsIouringNotEnabled) { + // TODO: b/385143770 - Change this behavior to check the OS version and Liburing version. + testIsIouringEnabled(false); } diff --git a/src/LibUringUtils.cpp b/src/LibUringUtils.cpp deleted file mode 100644 index 25151d3..0000000 --- a/src/LibUringUtils.cpp +++ /dev/null
@@ -1,41 +0,0 @@ -/* - * Copyright (C) 2025 The Android Open Source Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#define LOG_TAG "LibUringUtils" - -#include <android-base/strings.h> -#include <liburingutils/LibUringUtils.h> -#include <sys/resource.h> -#include <sys/utsname.h> -#include <unistd.h> - -bool LibUringUtils::isIouringEnabled() { - // TODO: b/385143770 - Change this behavior to also check the Liburing version. - return isIouringSupportedByKernel(); -} - -bool LibUringUtils::isIouringSupportedByKernel() { - struct utsname uts {}; - unsigned int major, minor; - - uname(&uts); - if (sscanf(uts.release, "%u.%u", &major, &minor) != 2) { - return false; - } - - // We will only support kernels from 6.1 and higher. - return major > 6 || (major == 6 && minor >= 1); -}